# -*- coding: utf-8 -*-
from airflow.operators.python import PythonOperator
from jms.dim import jms_dim__dim_cn_three_codes_change_dt
from jms.es import jms_es__signed_address_warehouse
from jms.es.threecodes_update_esaddress_new.thrcodes_esaddr_new import jms_thrcodes_esaddr_new
from utils.alerts.es_threeCodeOnSuccess import es_threeCodeOnSuccess


def kwargs():
    kwargs = {
        "db": "jms_dim",
        "table": "dim_cn_second_codes_change_error_dt",
        "desc": "es地址库更新二段码失败的网点",
        "dingding_conn_id": "dingding_es_address",
    }
    return kwargs


jms_es__threecodes_update_esaddress_new = PythonOperator(
    task_id='jms_es__threecodes_update_esaddress_new',
    email=['yushuo@jtexpress.com','yl_bigdata@yl-scm.com'],
    python_callable=jms_thrcodes_esaddr_new,
    priority_weight=30,
    # execution_timeout=max(datetime.now().replace(hour=7, minute=0, second=0) - datetime.now(), timedelta(seconds=1)),
    retries=0,
    pool='threecode_es',
    pool_slots=3,
    op_args=['{{ execution_date | cst_ds_nodash }}', '{{ var.value.env }}'],
    on_success_callback=es_threeCodeOnSuccess(kwargs()), )

jms_es__threecodes_update_esaddress_new << [
    jms_es__signed_address_warehouse,
    jms_dim__dim_cn_three_codes_change_dt]
